package dbx

import (
	"encoding/json"
	"errors"
	"fmt"
	"gitee.com/zhongguo168a/go-nodex/logx"
	"gitee.com/zhongguo168a/gocodes/datax"
	"gitee.com/zhongguo168a/gocodes/myx/errorx"
	"gitee.com/zhongguo168a/gocodes/myx/slowtime"
	"github.com/oxequa/grace"
	"github.com/syndtr/goleveldb/leveldb"
	"strconv"
	"sync"
	"time"
)

var saveRoot *SaveRoot

func init() {
	saveRoot = newSaveRoot()
	saveRoot.enableSaveByTime()
}

func GetSaveRoot() *SaveRoot {
	return saveRoot
}

func newSaveRoot() (obj *SaveRoot) {
	obj = &SaveRoot{
		saveGroups: NewSaveGroupResp(),
		saveResp:   NewSaveResp(),
	}
	obj.SetConfig(&SaveConfig{
		MinTimeConfig: 1,
		FilePath:      "mongo/saveitems.data",
	})

	return
}

type SaveConfig struct {
	// 最小更新时间-配置，单位秒
	// 创建时间超过 最小更新时间 后，才允许加入存储列表
	MinTimeConfig int
	// 保存路径
	FilePath string
}

// SaveRoot
// 目前只支持mongodb
// 目前不支持分库分表
type SaveRoot struct {
	mutex sync.Mutex

	config *SaveConfig

	// saveKey to saveItems
	// saveKey 通常为角色，公会编号，或指定的一次delay操作
	saveGroups *SaveGroupResp
	//
	saveResp *SaveResp

	// 最小更新时间
	saveMinTime int
	//
	saveAvgTime int
	//
	reduceCount int
	// 记录N次更新时间的累计，用来求平均数，计算更新间隔
	// 小于等于100的
	saveTimeTotal int
	saveTimeCount int

	// 优先存储队列
	// 通常为失败的存储项
	firstSaveList []*SaveItem
}

func (root *SaveRoot) GetSaveItem(key string) *SaveItem {
	return root.saveResp.MustGet(key)
}
func (root *SaveRoot) SetConfig(config *SaveConfig) {
	root.config = config
	root.saveMinTime = config.MinTimeConfig
}

// RestoreSavedMap 还原指定路径的数据到数据库
func (root *SaveRoot) RestoreSavedMap(path string) (err error) {
	db, err := leveldb.OpenFile(path, nil)
	defer db.Close()

	iter := db.NewIterator(nil, nil)

	var items []*SaveItem
	for iter.Next() {
		value := iter.Value()
		item := &SaveItem{}
		err = json.Unmarshal(value, item)
		if err != nil {
			err = errorx.Wrap(err, "json unmarshal")
			return
		}
		items = append(items, item)
	}
	iter.Release()
	err = iter.Error()
	if err != nil {
		err = errorx.Wrap(err, "iter")
		return
	}

	root.mutex.Lock()
	for _, val := range items {
		root.saveResp.Set(val.Ident, val)
		root.firstSaveList = append(root.firstSaveList, val)
	}
	root.mutex.Unlock()

	root.处理存储集并保存(false)
	return
}

func (root *SaveRoot) SaveNow() {
	root.处理存储集并保存(false)
}

// SaveOnStop 进程停止时，保存变化到数据库，如果保存失败，写入文件避免数据丢失，等待处理
// todo: 上线前要做好失败与还原的测试
func (root *SaveRoot) SaveOnStop() error {
	root.处理存储集并保存(false)

	if root.config.FilePath == "" {
		return errorx.New("未设置文件路径，失败的修改项无法保存")
	}

	db, err := leveldb.OpenFile(root.config.FilePath, nil)
	if err != nil {
		return errorx.Wrap(err, "open file", datax.M{"FilePaht": root.config.FilePath})
	}
	defer db.Close()

	root.mutex.Lock()
	for i, item := range root.saveResp.Items() {
		key := "saveItem_" + strconv.Itoa(i)
		value, jsonerr := json.Marshal(item)
		if jsonerr != nil {
			logx.Error(errorx.Wrap(jsonerr, fmt.Sprintf("SaveOnStop, json.Marshal, item=%+v", item)))
			continue
		}
		err = db.Put([]byte(key), value, nil)
		if err != nil {
			err = errorx.Wrap(err, fmt.Sprintf("put %v", key))
			return err
		}
	}
	root.mutex.Unlock()
	return nil
}

func (root *SaveRoot) setSaveMinTime(val int) {
	if val < root.config.MinTimeConfig {
		root.saveMinTime = root.config.MinTimeConfig
	} else {
		root.saveMinTime = val
	}
}

// enableSaveByTime
func (root *SaveRoot) enableSaveByTime() {
	go func() {
		var err error
		grace.Recover(&err)

		for {
			root.处理存储集并保存(true)
			time.Sleep(time.Millisecond * 1000) // 每100毫秒检查一次
		}
	}()
}

func (root *SaveRoot) watchSaveTime(start time.Time) {
	end := slowtime.Now()
	diffTime := end.Sub(start).Milliseconds()
	//
	root.mutex.Lock()
	defer root.mutex.Unlock()

	root.saveTimeCount++
	if root.saveTimeCount > 100 {
		root.saveTimeCount = 100
	}

	root.justySaveMinTime(int(diffTime))
}

func (root *SaveRoot) justySaveMinTime(difftime int) {
	total := root.saveTimeTotal
	length := root.saveTimeCount

	count99 := total - root.saveAvgTime
	total = (count99 + difftime) / length
	// 平均值/毫秒
	avg := int(total / length)
	inc := 0
	if avg > 80 {
		if avg > root.saveAvgTime {
			root.reduceCount = 0
			inc++
		} else {
			root.reduceCount++
			if root.reduceCount%10 == 0 {
				inc--
			}
		}
		root.setSaveMinTime(root.saveMinTime + inc)
	} else {
		root.setSaveMinTime(root.config.MinTimeConfig)
	}
	//fmt.Printf("justySaveMinTime: avg=%v/%v, lastTime=%v\n", avg, saveRoot.saveAvgTime, saveRoot.saveMinTime)
	root.saveAvgTime = avg
	root.saveTimeTotal = total
}

// 需要检查创建时间: 当超过创建时间某个时长后，数据变化才会更新到数据库
func (root *SaveRoot) 处理存储集并保存(需要检查创建时间 bool) {
	defer root.watchSaveTime(slowtime.Now())

	root.mutex.Lock()
	defer root.mutex.Unlock()

	savedict := NewSaveDict()
	saveResp := root.saveResp

	for _, saveItem := range root.firstSaveList {
		savedict.Set(saveItem.Ident, saveItem)
	}
	groupList := root.saveGroups.ListByTime(需要检查创建时间, root.saveMinTime)
	for _, group := range groupList {
		//
		_ = group.ForRange(func(key string, val int) (err error) {
			saveItem, existed := saveResp.Pop(key)
			if !existed {
				// 如果保存项不存在，表示已经有另外的组处理过并删除掉
				return
			}
			if saveItem.CreateTime > val {
				// 如果保存项的创建时间少于保存组的更新时间，表示已经有另外的组处理过并删除掉后，有新的保存组创建了保存项
				// 当前的保存组的时效已过
				return
			}

			savedict.Set(saveItem.Ident, saveItem)
			return nil
		})

	}
	//
	if savedict.Length() > 0 {
		root.saveSaveDict(savedict)
	}
}

func (root *SaveRoot) saveSaveDict(dict *SaveDict) {
	tables := dict.GroupByTable()
	for _, val := range tables {
		root.saveTables(val)
	}
}

func (root *SaveRoot) saveTables(items []*SaveItem) {
	if len(items) > 0 {
		firstItem := items[0]
		table, has := GetTable(firstItem.Table)
		if !has {
			logx.Error("table not found: ", firstItem.Table)
			return
		}
		db, has := GetDB(table.GetDatabase())
		if !has {
			logx.Error("database not found: ", table.GetDatabase())
			return
		}
		failItems, err := db.SaveTables(items)
		if err != nil {
			if errors.Is(err, ErrDatabaseException) {
				databaseException = true
			} else {
				logx.Error(errorx.Wrap(err, "saveTables"))
			}
		}
		// 添加到错误处理队列
		if len(failItems) > 0 {
			for _, val := range failItems {
				root.firstSaveList = append(root.firstSaveList, val)
			}
		}
		databaseException = false
	}
}

func (root *SaveRoot) SaveByGroup(groupKey string) (err error) {
	group := root.saveGroups.MustGet(groupKey)
	if group == nil {
		return errorx.New("group not found", datax.M{"key": groupKey})
	}

	dict := root.saveResp.DictByGroup(group)
	root.saveSaveDict(dict)
	return nil
}
